#!/usr/bin/perl
use strict;
use MIME::Base64;
use threads;
use Thread::Queue;
use Getopt::Long qw(:config bundling no_ignore_case no_autoabbrev passthrough);
use IO::Handle qw();
use List::Util qw(shuffle);
use Fcntl qw(:flock);
use POSIX;

my ($SRC_HOST,$SRC_PORT,$SRC_USER,$SRC_PASSWD,$SRC_DATABASE,$DES_HOST,$DES_PORT,$DES_USER,$DES_PASSWD,$DES_DATABASE,
    $KEY,$SRC_MAPFILE,$BY_LEAF,$FORCE,$FORCE_OWNER,$MASTER_ONLY,@TABLE_ARRAY,@EX_TABLE_ARRAY,$TABLE_FILE,$EX_TABLE_FILE,$UNIGNORE_FILE,@SCHEMA_ARRAY,@EX_SCHEMA_ARRAY,
    $INCREMENT,$IGNORE_CHECK,$IGNORE_HBA,$NEED_DELETE,$NEED_TRUNCATE,$NEED_ANALYZE,$ENCODING,$BATCH_SIZE,$TRANSACTION,$COMPRESS,$GLOBAL_CONDITION,$LOG_PATH,$PARAMETER_FILE,$IS_HELP,$VERSION);
my (%SRC_HOSTMAP,%HASH_REPLICATION_TABLE,$TASK_QUEUE,$MSG_QUEUE,$LOG_FILE_HANDLE);
my ($BATCH_MAX,$BATCH_DEFAULT,$BATCH_MIN,$SQL_BATCH) = (20,6,1,300);
my ($KEY_DEFAULT) = (20140825);
my ($SRC_SEG_SIZE,$DES_SEG_SIZE,$SRC_GPHOME);
my ($LOG_PATH_DEFAULT,$LAST_STAT_FILE_NAME) = ($ENV{"HOME"}."/gpAdminLogs","");
my ($SQL_DELIM,$RECORD_SPLIT) = (chr(3).chr(4).chr(8),chr(5).chr(6).chr(9).chr(10));
my (@TASK_THREAD,$MSG_THREAD,$LOCK_FILE_HANDLE);
my (@MESSAGE_CACHE);
my ($SRC_DATABASE_VERSION,$DES_DATABASE_VERSION);
my ($WRITE_TO_FILE) = (0);
my %KEY_WORD_SET;
(my $CMD_NAME = $0) =~ s!.*/(.*)!$1!;
my $MAIN_PID = substr("000000".$$,-6);

my $SQL_GET_KEY_WORD = q#SELECT upper(word) FROM pg_get_keywords() WHERE catcode <> 'U';#;
my $RANDOM_TABLE_DDL_CHECK_SQL = qq{select md5(string_agg(attname,',' order by attnum)) from pg_attribute where attnum>0 and attrelid=
    (select oid from pg_class where relname='gp_segment_config' and relnamespace=(select oid from pg_namespace where nspname='gp_toolkit'));};
my $RANDOM_TABLE_DDL_CHECK_MD5 = "7a2c712df03bf33fdd5fea8f0a2ac8ed";
my $RANDOM_TABLE_DDL_SCRIPT = qq{drop table if exists gp_toolkit.gp_segment_config;
    create table gp_toolkit.gp_segment_config(database_content int,database_path varchar(128),database_port int)distributed randomly;};
my $RANDOM_TABLE_RECORD_CHECK_SQL = qq{select (select count(*) from gp_dist_random('gp_id')) segment_number,
        count(*) all_number, count(*) filter(where sc.database_content is null) no_entry_number,
        count(*) filter(where sc.database_content<>sc.gp_segment_id) wrong_number
    from (select c.content,f.fselocation,c.port,c.hostname from gp_segment_configuration c,pg_filespace_entry f
        where c.dbid=f.fsedbid and c.status='u' and c.role='p' and c.content>=0 and f.fsefsoid=(select oid from pg_filespace where fsname='pg_system')
    ) md left join gp_toolkit.gp_segment_config sc
    on md.content=sc.database_content and md.fselocation=sc.database_path and md.port=sc.database_port;};
my $RANDOM_TABLE_RECORD_CHECK_SQL_V6 = qq{select (select count(*) from gp_dist_random('gp_id')) segment_number,
        count(*) all_number, count(*) filter(where sc.database_content is null) no_entry_number,
        count(*) filter(where sc.database_content <> sc.gp_segment_id) wrong_number
    from (select c.content,c.datadir,c.port,c.hostname from gp_segment_configuration c
        where c.status = 'u' and c.role = 'p' and c.content >= 0
    ) md left join gp_toolkit.gp_segment_config sc
    on md.content = sc.database_content and md.datadir = sc.database_path and md.port = sc.database_port;};
my $RANDOM_TABLE_RECORD_INSERT_SQL = q{drop external table if exists gp_toolkit.gp_segment_config_ext;
create external web table gp_toolkit.gp_segment_config_ext(code text)
execute 'C=`PGOPTIONS="-c gp_session_role=utility" PGPORT=$GP_SEG_PORT PGDATABASE=$GP_DATABASE psql -qtAXc "show gp_contentid"`;
`PGOPTIONS="-c gp_session_role=utility" PGPORT=$GP_SEG_PORT PGDATABASE=$GP_DATABASE psql -qtAXc "insert into gp_toolkit.gp_segment_config select $C,''$GP_SEG_DATADIR'',$GP_SEG_PORT"`;'
on all format 'text';
select count(*) from gp_toolkit.gp_segment_config_ext;
drop external table if exists gp_toolkit.gp_segment_config_ext;};

my $GET_REPLICATION_TABLE_SQL = q{select replace(encode(textsend(n.nspname),'base64'),chr(10),''),replace(encode(textsend(c.relname),'base64'),chr(10),'')
    from pg_namespace n,pg_class c,gp_distribution_policy p where n.oid = c.relnamespace and c.oid = p.localoid AND policytype='r';};
my $GET_ALL_USER_TABLE_SQL = q{select '',replace(encode(textsend(n.nspname),'base64'),chr(10),''),replace(encode(textsend(c.relname),'base64'),chr(10),'')
    from pg_class c,pg_namespace n,gp_distribution_policy p
    where c.relnamespace=n.oid and c.oid=p.localoid and (c.relnamespace>16384 or n.nspname='public') and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
    and c.relkind='r' and relstorage<>'x' and c.oid not in(select parchildrelid from pg_partition_rule);};

my $GET_PARENT_CHILD_LIST_SQL = q{with all_parent as (
    select c.oid,nspname,relname,decode(relstorage,'h','h','a') relstorage,relpages
    from pg_class c, pg_namespace n
        where c.relnamespace = n.oid
        and c.relkind = 'r' and c.relstorage <> 'x'
        and (n.oid > 16384 or n.nspname = 'public')
        and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
        and c.oid not in(select parchildrelid from pg_partition_rule)
), all_child as (
    select parrelid,parchildrelid,nspname,relname,relstorage from(
        select p.parrelid,pr.parchildrelid,p.parlevel,n.nspname,c.relname,decode(c.relstorage,'h','h','a') relstorage,
            rank() over(partition by parrelid order by parlevel desc) rank
        from pg_partition p, pg_partition_rule pr, pg_class c, pg_namespace n
        where pr.paroid = p.oid and pr.parchildrelid = c.oid and c.relnamespace = n.oid
        and c.relkind = 'r' and c.relstorage <> 'x'
        and (n.oid > 16384 or n.nspname = 'public')
        and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
    ) t where rank = 1
), last_oper as (
    select objid,max(statime) statime from pg_stat_last_operation
    where staactionname in('CREATE','TRUNCATE','ALTER')
    group by objid
), aosegname as(
    select c1.oid,c2.relname from pg_class c1,pg_appendonly a,pg_class c2
    where c1.oid = a.relid and a.segrelid = c2.oid
)
select a1.relname,
    replace(encode(textsend(x.nspname),'base64'),chr(10),''),
    replace(encode(textsend(x.relname),'base64'),chr(10),''),x.relstorage,p.statime::timestamp,
    a2.relname,
    replace(encode(textsend(y.nspname),'base64'),chr(10),''),
    replace(encode(textsend(y.relname),'base64'),chr(10),''),y.relstorage,c.statime::timestamp
from all_parent x
left join all_child y on x.oid = y.parrelid
left join last_oper p on x.oid = p.objid
left join last_oper c on y.parchildrelid = c.objid
left join aosegname a1 on x.oid = a1.oid
left join aosegname a2 on y.parchildrelid = a2.oid
order by decode(y.relname,null,0,1) desc,decode(y.relname,null,0,x.oid),x.relpages * decode(x.relstorage,'h',1.0,2.5) desc;};

my $SQL_GET_ALL_USER_VIEW = q{with dep_all as(
    select v.oid void,t.oid toid,t.relkind tkind,nt.nspname,t.relname
    from pg_class v,pg_namespace nv,pg_depend dv,pg_class t,pg_namespace nt,pg_depend dt
    where
        v.oid = dv.refobjid and v.relnamespace = nv.oid
        and dv.objid = dt.objid and dv.refobjid <> dt.refobjid
        and t.oid = dt.refobjid and t.relnamespace = nt.oid
        and v.relkind = 'v' and dv.refclassid = 1259
        and dt.refclassid = 1259
        and dv.deptype = 'i'
        and (nv.oid > 16384 or nv.nspname = 'public')
        and nv.nspname not like E'pg\_temp\_%' and nv.nspname not like E'pg\_toast\_temp\_%'
        group by 1,2,3,4,5
),dep_multi as(
    select void,nspname,relname from (
        select * from dep_all d
        union all
        select d.void,d2.toid,d2.tkind,d2.nspname,d2.relname from dep_all d left join dep_all d2 on d.toid = d2.void and d.tkind='v'
        union all
        select d.void,d3.toid,d3.tkind,d3.nspname,d3.relname from dep_all d left join dep_all d2 on d.toid = d2.void and d.tkind='v' left join dep_all d3 on d2.toid = d3.void and d2.tkind='v'
    ) x where tkind = 'r' group by 1,2,3
),dep as(
    select void,string_agg('"'||nspname||'"."'||relname||'"',',') deptables,count(*) nbr from dep_multi group by 1
)
select replace(encode(textsend(n.nspname),'base64'),chr(10),''),replace(encode(textsend(c.relname),'base64'),chr(10),''),
    replace(encode(textsend(dep.deptables),'base64'),chr(10),'') from pg_class c
    join pg_namespace n on c.relnamespace=n.oid
    left join dep on c.oid = dep.void and dep.nbr <= 8
    where (c.relnamespace>16384 or n.nspname='public') and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
    and c.relkind='v';};


my $SQL_EXECUTE_FUNCTION_ARG_CHECK = q{select string_agg(t.typname,',' order by a.idx) from(
    select proargtypes typs,generate_series(0,array_upper(proargtypes,1)) idx from pg_proc
        where proname='gp_transfer_execute' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit')
    )a,pg_type t where t.oid=a.typs[a.idx];};
my $STR_EXECUTE_FUNCTION_ARG_CHECK = "varchar";
my $SQL_EXECUTE_FUNCTION_BODY_CHECK = q{select md5(prosrc) from pg_proc
    where proname='gp_transfer_execute' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit');};
my $MD5_EXECUTE_FUNCTION_BODY_CHECK = "5c9dd8af64ef05f79f06df02e39f3cf2";
my $SQL_EXECUTE_FUNCTION_DDL = q#create or replace function gp_toolkit.gp_transfer_execute(command varchar) returns text as $$
import commands
cmd_str=command
try:
    (cod,val)=commands.getstatusoutput(cmd_str)
    if cod!=0:
        plpy.error("%s:%s" % (cod,val))
    else:
        return val
except Exception, e:
    plpy.error(str(e))
$$ language plpythonu;
#;

my $STR_SEG_PG_HBA = q^# "local" is for Unix domain socket connections only
local   all     all                             trust
# IPv4 local connections:
host    all     all     127.0.0.1/24            trust
host    all     all       0.0.0.0/0             trust
host    all     all           ::0/0             trust
^;

my $HEAP_STAT_FUNCTION_ARG_CHECK_SQL = qq{select string_agg(t.typname,',' order by a.idx) from(
    select proargtypes typs,generate_series(0,array_upper(proargtypes,1)) idx from pg_proc
        where proname='gp_heap_table_stat' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit')
    )a,pg_type t where t.oid=a.typs[a.idx];};
my $HEAP_STAT_FUNCTION_ARG_CHECK_VALUE = "int4,varchar,int4,varchar,varchar";
my $HEAP_STAT_FUNCTION_SRC_CHECK_SQL = qq{select md5(prosrc) from pg_proc where proname='gp_heap_table_stat' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit');};
my $HEAP_STAT_FUNCTION_SRC_CHECK_MD5 = "424ee1fa4406bfaa4dab0e0404a05803";
my $HEAP_STAT_FUNCTION_DDL = q#CREATE OR REPLACE FUNCTION gp_toolkit.gp_heap_table_stat(version int,dbname varchar,port int,path varchar, tbname varchar) RETURNS SETOF varchar AS $$
import os
grid=tbname
(scma,rel)=grid.split(".",1)
def getSqlValue(sql):
    cmd="PGOPTIONS='-c gp_session_role=utility' psql -qtAXF '|' -v ON_ERROR_STOP=1 -d '"+dbname+"' -p "+str(port)+" 2>&1 <<_END_OF_SQL\n"+sql+"\n_END_OF_SQL\n"
    try:
        val=os.popen(cmd).read()
        return val.strip()
    except Exception, e:
        plpy.error(str(e))
try:
    relInfoSql="""select reltablespace,relfilenode from pg_class c,pg_namespace n
        where c.relnamespace = n.oid and n.nspname = '%s' and c.relname = '%s'""" % (scma,rel)
    cmdVal=getSqlValue(relInfoSql)
    if cmdVal=="":
        plpy.error("Table not exists %s" % (tbname))
    (relTSP,relFile)=cmdVal.split("|",1)
    filePath=""
    datOid = ""
    datInfoSql="""select oid,dattablespace from pg_database where datname = current_database()"""
    (datOid,datTSP)=getSqlValue(datInfoSql).split("|",1)
    if relTSP=="0":
        relTSP=datTSP
    if relTSP=="1663":
        filePath=path + "/base/" + datOid + "/" + relFile
    else:
        if version>5:
            filePath=path+"/pg_tblspc/"+relTSP+"/GPDB_*/"+datOid+"/"+relFile
        else:
            fspSql="""select n.location_1 from pg_tablespace t,pg_filespace f,gp_persistent_filespace_node n"""
            fspSql+=""" where t.spcfsoid = f.oid and f.oid = n.filespace_oid and t.oid = %s""" % (relTSP)
            filePath=getSqlValue(fspSql) + "/" + relTSP + "/" + datOid + "/" + relFile
    lsCmd="""if [ -f %s ];then ls --full-time %s;fi|awk '{print $6" "$7}'\n""" % (filePath,filePath)
    lsCmd+="""I=0;while true;do I=$((I+1));if [ -f %s.$I ];then ls --full-time %s.$I;else break;fi;done|awk '{print $6" "$7}'""" % (filePath,filePath)
    for tim in os.popen(lsCmd).read().strip().split("\n"):
        yield(tim)
except Exception, e:
    plpy.error(str(e))
$$ language plpythonu;
#;

my $CMD_CLEVER_COMMAND = q^#!/bin/bash
cd $(cd "$(dirname "$0")"; pwd)
ARGS=$1
ARGV=(${ARGS//./ })
GPHOME=${ARGV[0]}
GPHOME=`echo "$GPHOME"|base64 -di`
KEY=${ARGV[1]}
USER=${ARGV[2]}
PASS=${ARGV[3]}
PASS=`echo "$PASS"|base64 -di`
INDX=${ARGV[4]}
SIZE=${ARGV[5]}
TYPE=${ARGV[6]}
LPORT=${ARGV[7]}
DBNM=${ARGV[8]}
SCMA=${ARGV[9]}
REL=${ARGV[10]}
LOCK=${ARGV[11]}
LOCK=`echo "$LOCK"|base64 -di`
ENCD=${ARGV[12]}
SEED=${ARGV[13]}
COMP=${ARGV[14]}
UNCOMP=""
if [ "$COMP" == "gzip" ];then UNCOMP="gunzip";elif [ "$COMP" == "zstd" ];then UNCOMP="unzstd";fi
WHERE=${ARGV[15]}
WHERE=`echo "$WHERE"|base64 -di`
NMBR=`cat $KEY.info|wc -l`
NMBR=$((NMBR-1))
LCNTT=`PGOPTIONS='-c gp_session_role=utility' PGUSER=$USER psql -qtAX -p $LPORT -d template1 -c "show gp_contentid"`
LCNTT=$(((LCNTT+SEED)%SIZE))
COPYQ="COPY (SELECT * FROM \"$SCMA\".\"$REL\"$WHERE) TO STDOUT;"
if [ "" == "$WHERE" ];then
    COPYQ="COPY \"$SCMA\".\"$REL\" TO STDOUT;"
fi
if [[ "TEST" == "$WHERE" && "$COMP" != "0" ]];then
    for RIP in `cat $KEY.info|awk -F ',' '{print \\$2}'|sort|uniq`;do
        MSG=`ssh $USER@$RIP -T "echo 1"`
        ERR=$?;if [[ $ERR -ne 0 || $MSG -ne 1 ]];then exit 2;fi
    done
fi
if [ "M" == "$TYPE" ];then
    RINFO=`cat $KEY.info|sed -n 1p`
    RARRAY=(${RINFO//,/ })
    RIP=${RARRAY[1]}
    RPORT=${RARRAY[2]}
    if [ "$COMP" == "0" ];then
ps ax|grep -w 'psql'|grep -w "GPDBTRANSFER.$KEY.$INDX.$LPORT"|awk '{if($1!='$$'){system("kill -9 "$1)}}' >/dev/null 2>&1
PGOPTIONS="-c client_encoding=$ENCD" PGAPPNAME="GPDBTRANSFER.$KEY.$INDX" PGUSER=$USER PGPASSWORD=$PASS psql -qtAX -v ON_ERROR_STOP=1 -v "ID=GPDBTRANSFER.$KEY.$INDX.$LPORT" -h $RIP -p $RPORT -d $DBNM <<END_OF_PSQL
BEGIN;
$LOCK;
$COPYQ
END;
END_OF_PSQL
        ERR=$?;if [ $ERR -ne 0 ];then exit $ERR;fi
    else
set -o pipefail
ssh $USER@$RIP -T <<END_OF_OUTER_SHELL |$UNCOMP
ps ax|grep -w 'psql'|grep -w "GPDBTRANSFER.$KEY.$INDX.$RPORT"|awk '{if(\\$1!='\\$\\$'){system("kill -9 "\\$1)}}' >/dev/null 2>&1
. $GPHOME/greenplum_path.sh
set -o pipefail
PGOPTIONS="-c client_encoding=$ENCD" PGAPPNAME="GPDBTRANSFER.$KEY.$INDX" PGUSER=$USER PGPASSWORD=$PASS psql -qtAX -v ON_ERROR_STOP=1 -v "ID=GPDBTRANSFER.$KEY.$INDX.$RPORT" -p $RPORT -d $DBNM <<END_OF_INNER_SHELL |$COMP -c -f -1
BEGIN;
$LOCK;
$COPYQ
END;
END_OF_INNER_SHELL
ERR=\\$?;if [ \\$ERR -ne 0 ];then exit \\$ERR;fi
END_OF_OUTER_SHELL
        ERR=$?;if [ $ERR -ne 0 ];then exit $ERR;fi
    fi
else
    while [ $LCNTT -lt $NMBR ];do
        LINE=$((LCNTT+2))
        RINFO=`cat $KEY.info|sed -n ${LINE}p`
        RARRAY=(${RINFO//,/ })
        RCNTT=${RARRAY[0]}
        RIP=${RARRAY[1]}
        RPORT=${RARRAY[2]}
        if [ "TEST" == "$WHERE" ];then
            CCNTT=`PGOPTIONS='-c gp_session_role=utility' PGUSER=$USER psql -qtAX -h $RIP -p $RPORT -d $DBNM -c "show gp_contentid"`
            ERR=$?;if [[ $ERR -ne 0 || "$CCNTT" != "$RCNTT" ]];then exit 1;fi
        else
            if [ "$COMP" == "0" ];then
ps ax|grep -w 'psql'|grep -w "GPDBTRANSFER.$KEY.$INDX.$LPORT"|awk '{if($1!='$$'){system("kill -9 "$1)}}' >/dev/null 2>&1
PGOPTIONS="-c gp_session_role=utility -c client_encoding=$ENCD" PGAPPNAME="GPDBTRANSFER.$KEY.$INDX" PGUSER=$USER PGPASSWORD=$PASS psql -qtAX -v ON_ERROR_STOP=1 -v "ID=GPDBTRANSFER.$KEY.$INDX.$LPORT" -h $RIP -p $RPORT -d $DBNM <<END_OF_PSQL
BEGIN;
$LOCK;
$COPYQ
END;
END_OF_PSQL
                ERR=$?;if [ $ERR -ne 0 ];then exit $ERR;fi
            else
set -o pipefail
ssh $USER@$RIP -T <<END_OF_OUTER_SHELL |$UNCOMP
ps ax|grep -w 'psql'|grep -w "GPDBTRANSFER.$KEY.$INDX.$RPORT"|awk '{if(\\$1!='\\$\\$'){system("kill -9 "\\$1)}}' >/dev/null 2>&1
. $GPHOME/greenplum_path.sh
set -o pipefail
PGOPTIONS="-c gp_session_role=utility -c client_encoding=$ENCD" PGAPPNAME="GPDBTRANSFER.$KEY.$INDX" PGUSER=$USER PGPASSWORD=$PASS psql -qtAX -v ON_ERROR_STOP=1 -v "ID=GPDBTRANSFER.$KEY.$INDX.$RPORT" -p $RPORT -d $DBNM <<END_OF_INNER_SHELL |$COMP -c -f -1
BEGIN;
$LOCK;
$COPYQ
END;
END_OF_INNER_SHELL
ERR=\\$?;if [ \\$ERR -ne 0 ];then exit \\$ERR;fi
END_OF_OUTER_SHELL
                ERR=$?;if [ $ERR -ne 0 ];then exit $ERR;fi
            fi
        fi
        LCNTT=$((LCNTT+SIZE))
    done
fi
^;
############################################################
my $HELP_MESSAGE = qq(COMMAND NAME: $CMD_NAME
Transfer data between Greenplum database use external execute table.

Developed by Miao Chen

Work Email:
michen\@pivotal.io
Private Email:
miaochen\@mail.ustc.edu.cn

************************************************************************************************
SYNOPSIS
************************************************************************************************
$CMD_NAME --src-host hostname
    [--src-port database port]
    [--src-user database user]          [base64]
    [--src-passwd database password]    [base64]
    --src-database database name
    --des-host hostname
    [--des-port database port]
    [--des-user database user]         [base64]
    [--des-passwd database password]   [base64]
    --des-database database name
    [--key identity number]
    --src-mapfile filename
    [--by-leaf]
    [--force]
    [--owner owner]
    [--from-master]
    [-t <schema.relation> [-t <schema.relation>] ...]
    [-T <schema.relation> [-T <schema.relation>] ...]
    [-f file include table name]
    [-F file include table name]
    [-s <schema> [-s <schema>] ...]
    [-S <schema> [-S <schema>] ...]
    [--increment]
    [--ignore-check]
    [--ignore-hba]
    [--delete]
    [--truncate]
    [--analyze]
    [--encoding encoding]
    [-B batch_size]
    [--transaction]
    [--compress method]
    [--where condition]
    [--log-path directory]
    [--parameter-file | --pf filename]
    [-h|--help]
    [--version]

For those options end with [base64], you can specify base64 encode.
You can specify the value like:
src-passwd=base64Z3BhZG1pbg==
means:
src-passwd=gpadmin
*****************************************************
DESCRIPTION
*****************************************************
The $CMD_NAME utility is used to transfer data in table or vew from source database to destination database(Greenplum Database).
When you start $CMD_NAME, the utility will export from source database, the same time import to the destination database.

$CMD_NAME is running in parallel.
One table transfer is running at once on all Segment Host.
At any time, there are several tables or views are transfer.

Note:
The source data can be table or view, but the destination can only be table.
*****************************************************
OPTIONS
*****************************************************

--src-host <source cluster master host>

  Source database hostname or IP address.
  This option is required.

  If specify a hostname, you should make sure the hostname is known for current environment.
  The utility will not translate it to IP address use --src-mapfile.It's best to use an IP address.

  eg.
  --src-host src_master
  --src-host 172.28.4.250

--src-port <source cluster master port>

  Source database port number, If not specified, the default is 5432.
  eg.
  --src-port 5433

--src-user <source cluster user name>

  User name that is used to connect to the source database. If not specified, the default is gpadmin.
  eg.
  --src-user src_user

--src-passwd <source cluster user password>

  User password that is used to connect to the source database.
  If not specified, you should make source the connection is trusted.
  eg.
  --src-passwd password

--src-database <database>

  Source database.
  This option is required.
  If you not specify -t and -f options, all user tables in this database will be transfer.
  eg.:
  --src-database src_database

--des-host <destination cluster master host>

  Destination database hostname or IP address.
  This option is required.
  eg.
  --des-host des_master
  --des-host 172.28.4.250

--des-port <destination cluster master port>

  Destination database port number, If not specified, the default is 5432.
  eg.
  --des-port 5433

--des-user <destination cluster user name>

  User name that is used to connect to the destination database. If not specified, the default is gpadmin.
  eg.
  --des-user src_user

--des-passwd <destination cluster user password>

  User password that is used to connect to the destination database.
  If not specified, you should make source the connection is trusted.
  eg.
  --des-passwd password

--des-database <database>

  Destination database.
  This option is required.
  eg.:
  --des-database des_database

--key <identity number>

  Identity number to lock resource between two cluster.
  Default is $KEY_DEFAULT
  eg.
  --key 20140825

--src-mapfile <source cluster segment hostname and ipaddress map file>

  File that lists source segment host name and IP addresses.
  This option is required.
  If not all segment hosts are listed, $CMD_NAME may occur an error and quits.
  Each line of the file contains a source segment address name and the IP address separated by a comma:
  <hostname>,<IPaddress>
  This example lists 4 hosts and their IP addresses.
  sdw1-1,172.28.4.1
  sdw1-2,172.28.8.1
  sdw2-1,172.28.4.2
  sdw2-2,172.28.8.2
  sdw3-1,172.28.4.3
  sdw3-2,172.28.8.3
  sdw4-1,172.28.4.4
  sdw4-2,172.28.8.4

  You should specify all the map relationship about all segment host in source database table gp_segment_configuration:
  select distinct address from gp_segment_configuration where content>-1
  If you can ensure all the destination segments can identify all the source segments address,
  you can specify an empty file, but don't recommend.

--by-leaf

  Transfer tables, which cannot be a parent table, must be leaf table.
  In other word, tables must do not have children tables.

--force

  Specify this option to force recreate destination table and transfer, when table definition has difference.

--owner

  Specify this option to set owner -- when use --force option and recreate destination table when needed.

--from-master

  Specify this option to use only master to master data transfer path from some network restrict.
  For example, network accessible only between master with master.

-t <schema.tablename>

  A table from the source database to transfer.
  The fully qualified table name must be specified.
  This option can be specified multiple times to include multiple tables.
  If the source table does not exist, $CMD_NAME ignore it and log a notice message.

  eg.
  -t myschema.src_table

-T <schema.relation>

  Do not transfer this table.
  This option can be specified multiple times to exclude multiple tables.
  eg.:
  -S public.ex_a -S public.ex_b

-f <table-file>

  The location and name of file containing list of fully qualified table names to copy from the source database.
  In the text file, you should specify a single fully qualified table per line.
  If the source table does not exist, $CMD_NAME ignore it and log a notice message.
  In this file you can also specify views not only tables.
  And, if you specify a view, you should specify the destination qualified table name as format:
  src_schema.src_relation[=>des_schema.des_relation][;condition];
  This example lists 4 tables or view should be transfer.
  public.customer;time>='2016-01-01'
  public.customer_v=>public.customer_branck;district_code='021'
  myschema.orders
  his.orders_his

-F <table-file>

  The location and name of file containing list of fully qualified table names to be exclude.
  In the text file, you should specify a single fully qualified table per line.
  If the table does not exist, $CMD_NAME ignore it.
  You should specify the qualified table name as format:
  schema.relation

--unignore <table-file>

  The location and name of file containing list of fully qualified table names to copy from the source database ignore the increment check.
  In the text file, you should specify a single fully qualified table per line.
  You should specify the qualified table name as format:
  schema.relation

-s <schema name>

  Transfer tables in this schema.
  This option can be specified multiple times to include multiple schemas.
  eg.:
  -s public -s myschema

-S <schema name>

  Do not transfer tables in this schema.
  This option can be specified multiple times to include multiple schemas.
  eg.:
  -S public -S myschema

--increment

   Transfer will identify whether a table need transfer again.
   From the last transfer to current, if found a table not change, will ignore it.
   The change tag will save in a file under path --log-path, please ensure the parameter not change.
   And, for use this character, please ensure HOST & PORT & DATABASE not change for both source and destination cluster.

--ignore-check

  Transfer will ignore relation's definition check between source and destination cluster.
  It is very usefull for cluster migration to improve performance, if tow cluster have the same ddl and does not need to check.
  Especially for those cluster have catalog performance issue.
  If specify this parameter, --force parameter will not work, only use this parameter for migration between same ddl cluster.

--ignore-hba

  Transfer will ignore pg_hba.conf modification.
  But, you should make sure all the destination instance can access all the source instance with psql without password.
  Just use this parameter while you want control the pg_hba.conf configuration by yourself.

--delete

  Specify this option to delete data from the table in the destination database.
  The delete condition is specified by the same option --where.
  If you not specify --where option, and you specify this parameter, the effect is the same as --truncate.

--truncate

  Specify this option to truncate the table in the destination database.

--analyze

  After load data, analyze the destination table to collect statistics information.

--encoding <encoding>

  Transfer data with this encoding, the default is UTF8.

-B <batch_size>

  Sets the maximum number of tables that $CMD_NAME concurrently copies to the destination database.
  If not specified, the default is $BATCH_DEFAULT.
  The max is $BATCH_MAX.
  The min is $BATCH_MIN.

--transaction

  Specify this option to enable TRUNCATE and LOAD operation in one transaction.
  For version less then 6, different leaf partition may need other lock, so, for those version, use transaction, partition table will can not use concurrency.

--compress <method>

  Specify this option to enable compress stream transfer between src cluster and destination cluster.
  If use compress option, you should make sure all destination segment can ssh to all src segment(include master) without password.
  Current support gzip and zstd compression method.
  You should make sure all the source and destination hosts have installed the compression rpm.

  eg.
  --compress gzip
  --compress zstd

--where <condition>

  Specify this option to filter data from source table.

--log-path <directory>

  Specify the $CMD_NAME log file directory.
  If not specified, the default is ~/gpAdminLogs.

--parameter-file | --pf <parameter_file>

  Specify the parameter file.
  Format:
  name=value
  eg.:

    src-host      = 192.168.0.1
    src-port      = 5432
    src-user      = gpadmin
    src-passwd    = gpadmin
    src-database  = src
    des-host      = 192.168.0.1
    des-port      = 5432
    des-user      = gpadmin
    des-passwd    = gpadmin
    des-database  = des
#   key           = 20140825
    src-mapfile   = src-mapfile
#   by-leaf
#   force
#   owner         = gpuser
#   from-master
#   t             = schema.relation
#   t             = schema.relation
#   T             = schema.relation
#   T             = schema.relation
#   f             = filename
#   F             = filename
#   s             = schema
#   s             = schema
#   S             = schema
#   S             = schema
#   increment
#   ignore-check
#   delete
#   truncate
#   analyze
#   encoding      = UTF8
#   B             = 6
#   transaction
#   compress      = gzip
#   compress      = zstd
#   where         =
#   log-path      = /home/gpadmin/gpAdminLogs

-h|--help

  Displays the online help.

--version

  Displays the command version.
);

sub printMessage{
    my ($flag,$message) = @_;
    if("RAW" ne $flag){
        my $time_flag = strftime("%Y%m%d:%H:%M:%S.",localtime).$MAIN_PID;
        $message = "$time_flag-[$flag]-:$message\n";
    }
    if("ERROR" eq $flag){
        print STDERR $message;
    }else{
        print STDOUT $message;
    }
    return $message;
}
sub logMessage{
    my ($flag,$message) = @_;
    my $log_message = printMessage($flag,$message);
    if($WRITE_TO_FILE == 1){
        for my $msg(@MESSAGE_CACHE){
            print $LOG_FILE_HANDLE $msg;
        }
        @MESSAGE_CACHE = ();
        $WRITE_TO_FILE = 2;
        print $LOG_FILE_HANDLE $log_message;
    }elsif($WRITE_TO_FILE == 2){
        print $LOG_FILE_HANDLE $log_message;
    }else{
        push @MESSAGE_CACHE,$log_message;
    }
}
sub exitMain{
    my ($code) = @_;
    if(defined $LOG_FILE_HANDLE){
        close($LOG_FILE_HANDLE);
    }
    exit $code;
}
sub errorMessage{
    my ($message) = @_;
    logMessage("ERROR",$message);
    print STDERR "Usage: $CMD_NAME [-h|--help] [options]\n";
    exitMain(1);
}
sub trim{
    my ($string) = @_;
    $string =~ s/(^\s+|\s+$)//g;
    return $string;
}
sub encode{
    my ($string) = @_;
    my $encode = encode_base64($string);
    $encode =~ s/\n//g;
    return $encode;
}
sub decode{
    return decode_base64($_[0]);
}
sub quote{
    my ($ident) = @_;
    $ident =~ s/"/""/g;
    if($ident !~ /^[a-z][a-z0-9_]*$/ || exists $KEY_WORD_SET{uc($ident)}){
        $ident = '"'.$ident.'"';
    }
    return $ident;
}
sub illegal{
    my @list = @_;
    for my $str(@list){
        if ($str =~ /\W/){
            return 1;
        }
    }
    return 0;
}
sub escape{
    my ($str) = @_;
    $str =~ s/\r/\\r/g;
    $str =~ s/\n/\\n/g;
    $str =~ s/\t/\\t/g;
    return $str;
}
sub readLineFromFile{
    my ($file_path) = @_;
    if(!-e $file_path){
        errorMessage("No file exists named: $file_path");
    }
    if(!open(FILE,"<",$file_path)){
        errorMessage("Can't open file: $file_path");
    }
    my @lines_array = ();
    while(my $line = <FILE>){
        $line = trim($line);
        if(!($line =~ /^#/) && "" ne $line){
            push @lines_array,$line;
        }
    }
    close FILE;
    return @lines_array;
}
sub queryResult{
    my ($src_or_des,$query_sql,$return_flag) = @_;
    my $CMDS = "PGAPPNAME=gpdbtransfer PGOPTIONS='-c client_encoding=UTF8".($NEED_ANALYZE ? "" : " -c gp_autostats_mode=NONE")."' ";
    if("SRC" eq $src_or_des){
        $CMDS = $CMDS."PGHOST='$SRC_HOST' PGDATABASE='$SRC_DATABASE' PGPORT=$SRC_PORT ";
        $CMDS = $CMDS.("" ne $SRC_USER ? "PGUSER='$SRC_USER' " : "").("" ne $SRC_PASSWD ? "PGPASSWORD='$SRC_PASSWD' " : "");
    }elsif("DES" eq $src_or_des){
        $CMDS = $CMDS."PGHOST='$DES_HOST' PGDATABASE='$DES_DATABASE' PGPORT=$DES_PORT ";
        $CMDS = $CMDS.("" ne $DES_USER ? "PGUSER='$DES_USER' " : "").("" ne $DES_PASSWD ? "PGPASSWORD='$DES_PASSWD' " : "");
    }
    local $/ = $RECORD_SPLIT;
    $CMDS = $CMDS."psql -R '$/' -tAXF '$SQL_DELIM' -v ON_ERROR_STOP=1 2>&1 <<'END_OF_SQL'\n";
    $CMDS = $CMDS.$query_sql."\n";
    $CMDS = $CMDS."END_OF_SQL\n";
    my @result = readpipe($CMDS);
    my $return_code = $? >> 8;
    chomp(@result);
    local $/ = chr(10);
    chomp($result[-1]) if (@result > 0);
    return ($return_code,join("\n",@result)) if ("CV" eq $return_flag);
    return ($return_code,@result) if ("CR" eq $return_flag && $return_code != 0);
    errorMessage(join("\n",@result)) if ($return_code);
    return join("\n",@result) if ("Scalar" eq $return_flag);
    my @return_list = ();
    for my $row(@result){
        push @return_list,[split(/$SQL_DELIM/,$row)];
    }
    return (0,@return_list) if ("CR" eq $return_flag);
    return @return_list;
}
sub getOption{
    GetOptions(
        'src-host:s'          => \$SRC_HOST,         'src-port:i'    => \$SRC_PORT,       'src-user:s' => \$SRC_USER,   'src-passwd:s' => \$SRC_PASSWD, 'src-database:s' => \$SRC_DATABASE,
        'des-host:s'          => \$DES_HOST,         'des-port:i'    => \$DES_PORT,       'des-user:s' => \$DES_USER,   'des-passwd:s' => \$DES_PASSWD, 'des-database:s' => \$DES_DATABASE,
        'key:i'               => \$KEY,              'src-mapfile:s' => \$SRC_MAPFILE,    'by-leaf!'   => \$BY_LEAF,    'force!'       => \$FORCE,      'owner'          => \$FORCE_OWNER,
        'from-master!'        => \$MASTER_ONLY,
        't:s'                 => \@TABLE_ARRAY,      'T:s'           => \@EX_TABLE_ARRAY,
        'f:s'                 => \$TABLE_FILE,       'F:s'           => \$EX_TABLE_FILE,  'unignore:s' => \$UNIGNORE_FILE,
        's:s'                 => \@SCHEMA_ARRAY,     'S:s'           => \@EX_SCHEMA_ARRAY,
        'increment!'          => \$INCREMENT,        'ignore-check!' => \$IGNORE_CHECK,   'ignore_hba' => \$IGNORE_HBA, 'delete!'      => \$NEED_DELETE, 'truncate!'     => \$NEED_TRUNCATE,
        'analyze!'            => \$NEED_ANALYZE,     'encoding:s'    => \$ENCODING,       'B:i'        => \$BATCH_SIZE, 'transaction'  => \$TRANSACTION, 'compress:s'    => \$COMPRESS,
        'where:s'             => \$GLOBAL_CONDITION, 'log-path:s'    => \$LOG_PATH,
        'parameter-file|pf:s' => \$PARAMETER_FILE,   'h|help!'       => \$IS_HELP,         'version!'  => \$VERSION,
    );
    if(@ARGV != 0){
        errorMessage("Some parameters unknown: [@ARGV]\nPlease refer to $CMD_NAME --help");
    }
    if($IS_HELP){
        print $HELP_MESSAGE;
        exitMain(0);
    }
    if($VERSION){
        print "$CMD_NAME 2.1\n";
        exitMain(0);
    }
    if("" ne $PARAMETER_FILE){
        my @parameter_array = readLineFromFile($PARAMETER_FILE);
        for my $line(@parameter_array){
            my ($para,$val) = split(/=/,$line,2);
            ($para,$val) = (trim($para),trim($val));
            if("src-host"      eq $para && "" eq $SRC_HOST         ){$SRC_HOST             =$val;}
            if("src-port"      eq $para && "" eq $SRC_PORT         ){$SRC_PORT             =$val;}
            if("src-user"      eq $para && "" eq $SRC_USER         ){$SRC_USER             =$val;}
            if("src-passwd"    eq $para && "" eq $SRC_PASSWD       ){$SRC_PASSWD           =$val;}
            if("src-database"  eq $para && "" eq $SRC_DATABASE     ){$SRC_DATABASE         =$val;}
            if("des-host"      eq $para && "" eq $DES_HOST         ){$DES_HOST             =$val;}
            if("des-port"      eq $para && "" eq $DES_PORT         ){$DES_PORT             =$val;}
            if("des-user"      eq $para && "" eq $DES_USER         ){$DES_USER             =$val;}
            if("des-passwd"    eq $para && "" eq $DES_PASSWD       ){$DES_PASSWD           =$val;}
            if("des-database"  eq $para && "" eq $DES_DATABASE     ){$DES_DATABASE         =$val;}
            if("key"           eq $para && "" eq $KEY              ){$KEY                  =$val;}
            if("src-mapfile"   eq $para && "" eq $SRC_MAPFILE      ){$SRC_MAPFILE          =$val;}
            if("by-leaf"       eq $para && "" eq $BY_LEAF          ){$BY_LEAF              =1   ;}
            if("force"         eq $para && "" eq $FORCE            ){$FORCE                =1   ;}
            if("owner"         eq $para && "" eq $FORCE_OWNER      ){$FORCE_OWNER          =$val;}
            if("from-master"   eq $para && "" eq $MASTER_ONLY      ){$MASTER_ONLY          =1   ;}
            if("t"             eq $para                            ){push @TABLE_ARRAY     ,$val;}
            if("T"             eq $para                            ){push @EX_TABLE_ARRAY  ,$val;}
            if("f"             eq $para && "" eq $TABLE_FILE       ){$TABLE_FILE           =$val;}
            if("F"             eq $para && "" eq $EX_TABLE_FILE    ){$EX_TABLE_FILE        =$val;}
            if("unignore"      eq $para && "" eq $UNIGNORE_FILE    ){$$UNIGNORE_FILE       =$val;}
            if("s"             eq $para                            ){push @SCHEMA_ARRAY    ,$val;}
            if("S"             eq $para                            ){push @EX_SCHEMA_ARRAY ,$val;}
            if("increment"     eq $para && "" eq $INCREMENT        ){$INCREMENT            =1   ;}
            if("ignore-check"  eq $para && "" eq $IGNORE_CHECK     ){$IGNORE_CHECK         =1   ;}
            if("ignore-hba"    eq $para && "" eq $IGNORE_HBA       ){$IGNORE_HBA           =1   ;}
            if("delete"        eq $para && "" eq $NEED_DELETE      ){$NEED_DELETE          =1   ;}
            if("truncate"      eq $para && "" eq $NEED_TRUNCATE    ){$NEED_TRUNCATE        =1   ;}
            if("analyze"       eq $para && "" eq $NEED_ANALYZE     ){$NEED_ANALYZE         =1   ;}
            if("encoding"      eq $para && "" eq $ENCODING         ){$ENCODING             =$val;}
            if("B"             eq $para && "" eq $BATCH_SIZE       ){$BATCH_SIZE           =$val;}
            if("transaction"   eq $para && "" eq $TRANSACTION      ){$TRANSACTION          =1   ;}
            if("compress"      eq $para && "" eq $COMPRESS         ){$COMPRESS             =$val;}
            if("where"         eq $para && "" eq $GLOBAL_CONDITION ){$GLOBAL_CONDITION     =$val;}
            if("log-path"      eq $para && "" eq $LOG_PATH         ){$LOG_PATH             =$val;}
        }
    }
}
sub checkOption{
    if("" eq $LOG_PATH){
        $LOG_PATH = $LOG_PATH_DEFAULT;
        logMessage("NOTICE","Log path not specify, Use default log path:$LOG_PATH");
    }
    system("mkdir -m 777 -p $LOG_PATH");
    my $log_file = $LOG_PATH."/gpdbtransfer_".(strftime("%Y%m%d",localtime)).".log";
    if(!open($LOG_FILE_HANDLE,">>",$log_file)){
        errorMessage("Can't open file:$log_file");
    }else{
        $LOG_FILE_HANDLE->autoflush(1);
        logMessage("INFO","Log file:$log_file");
    }
    if("" eq $BATCH_SIZE || $BATCH_SIZE > $BATCH_MAX || $BATCH_SIZE < $BATCH_MIN){
        logMessage("NOTICE","Not specify or out of limit, use default($BATCH_DEFAULT): -B");
        $BATCH_SIZE = $BATCH_DEFAULT;
    }
    if("" eq $SRC_HOST){
        errorMessage("Please specify parameter: --src-host");
    }
    if("" eq $SRC_PORT){
        $SRC_PORT = "5432";
    }
    if("" eq $SRC_USER){
        $SRC_USER = "gpadmin";
    }
    if("" eq $SRC_DATABASE){
        errorMessage("Please specify parameter: --src-database");
    }elsif(illegal($SRC_DATABASE)){
        errorMessage("Source database name is not legal:[".escape($SRC_DATABASE)."]");
    }
    if("" eq $DES_HOST){
        errorMessage("Please specify parameter: --des-host");
    }
    if("" eq $DES_PORT){
        $DES_PORT = "5432";
    }
    if("" eq $DES_USER){
        $DES_USER = "gpadmin";
    }
    if("" eq $DES_DATABASE){
        errorMessage("Please specify parameter: --des-database");
    }elsif(illegal($DES_DATABASE)){
        errorMessage("Destination database name is not legal:[".escape($DES_DATABASE)."]");
    }
    if("" eq $SRC_MAPFILE){
        errorMessage("Please specify parameter: --src-mapfile");
    }
    if("" eq $ENCODING){
        $ENCODING = "UTF8";
    }
    if($SRC_PORT eq $DES_PORT && $SRC_DATABASE eq $DES_DATABASE && $SRC_HOST eq $DES_HOST){
        errorMessage("Can't transfer data in the same cluster and the same database");
    }
    if("" eq $KEY){
        $KEY = $KEY_DEFAULT;
    }
    if($NEED_TRUNCATE && $NEED_DELETE){
        errorMessage("Can't specify parameter at the same time --need-trunate with --need-delete");
    }
    if($IGNORE_CHECK && $FORCE){
        errorMessage("Can't specify parameter at the same time --ignore-check with --force");
    }
    if($INCREMENT){
        logMessage("NOTICE","Increment mode specify, ignore all data filter condition and force use --truncate and --by-leaf and disable --force");
        $LAST_STAT_FILE_NAME = $LOG_PATH."/gpdbtransfer_".$SRC_HOST.".".$SRC_PORT.".".$SRC_DATABASE.".".$DES_HOST.".".$DES_PORT.".".$DES_DATABASE.".tag";
        system("touch ".$LAST_STAT_FILE_NAME);
        logMessage("INFO","Last stat file name:\n$LAST_STAT_FILE_NAME");
        $GLOBAL_CONDITION = "";
        $NEED_TRUNCATE = "1";
        $NEED_DELETE = "";
        $BY_LEAF = "1";
        $FORCE = "";
    }
    $GLOBAL_CONDITION="" ne $GLOBAL_CONDITION ? " where ".$GLOBAL_CONDITION : "";
    logMessage("INFO","Option values: --src-host $SRC_HOST --src-port $SRC_PORT --src-user $SRC_USER --src-passwd ****** --src-database $SRC_DATABASE");
    logMessage("INFO","Option values: --des-host $DES_HOST --des-port $DES_PORT --des-user $DES_USER --des-passwd ****** --des-database $DES_DATABASE");
    logMessage("INFO","Option values: --key $KEY --src-mapfile $SRC_MAPFILE --by-leaf $BY_LEAF --force $FORCE --owner $FORCE_OWNER --from-master $MASTER_ONLY");
    logMessage("INFO","Option values: -t ".join(' -t ',@TABLE_ARRAY)." -T ".join(' -T ',@EX_TABLE_ARRAY));
    logMessage("INFO","Option values: -f $TABLE_FILE -F $EX_TABLE_FILE --unignore $UNIGNORE_FILE");
    logMessage("INFO","Option values: -s ".join(' -s ',@SCHEMA_ARRAY)." -S ".join(' -S ',@EX_SCHEMA_ARRAY));
    logMessage("INFO","Option values: --increment $INCREMENT --ignore-check $IGNORE_CHECK --ignore-hba $IGNORE_HBA --delete $NEED_DELETE --truncate $NEED_TRUNCATE --analyze $NEED_ANALYZE --encoding $ENCODING -B $BATCH_SIZE");
    logMessage("INFO","Option values: --transaction $TRANSACTION --compress $COMPRESS --where $GLOBAL_CONDITION --log-path $LOG_PATH --parameter-file $PARAMETER_FILE");
    for my $ref((\$SRC_USER,\$SRC_PASSWD,\$DES_USER,\$DES_PASSWD)){
        if(uc($$ref) =~ /^BASE64/){
            $$ref = trim(decode(substr($$ref,6)));
        }
    }
    $COMPRESS = lc($COMPRESS);
    if($COMPRESS eq ""){
        $COMPRESS = "0";
    }elsif($COMPRESS ne "gzip" && $COMPRESS ne "zstd"){
        errorMessage("Current only support gzip and zstd compression method");
    }
}
sub getKeyWord{
    my @result = queryResult("DES",$SQL_GET_KEY_WORD);
    for my $row(@result){
        my ($word) = @$row;
        $KEY_WORD_SET{$word} = "";
    }
}
sub getVersion{
    my $versionString = queryResult("SRC","SELECT version();","Scalar");
    if($versionString =~ /Greenplum Database (\d)/){
        $SRC_DATABASE_VERSION = int($1);
    }
    $versionString = queryResult("DES","SELECT version();","Scalar");
    if($versionString =~ /Greenplum Database (\d)/){
        $DES_DATABASE_VERSION = int($1);
    }
}
sub checkSrcRandomTable{
    if(!$INCREMENT){
        return;
    }
    my $check_result = queryResult("SRC",$RANDOM_TABLE_DDL_CHECK_SQL,"Scalar");
    if($RANDOM_TABLE_DDL_CHECK_MD5 ne $check_result){
        logMessage("NOTICE","No table gp_toolkit.gp_segment_config or need recreate it");
        queryResult("SRC",$RANDOM_TABLE_DDL_SCRIPT);
    }
    my $random_table_record_check_sql = $RANDOM_TABLE_RECORD_CHECK_SQL;
    if($SRC_DATABASE_VERSION > 5){
        $random_table_record_check_sql = $RANDOM_TABLE_RECORD_CHECK_SQL_V6;
    }
    my @check_record_result = queryResult("SRC",$random_table_record_check_sql);
    my ($segment_number,$all_number,$no_entry_number,$wrong_number) = @{$check_record_result[0]};
    if($segment_number ne $all_number || $no_entry_number ne 0 || $wrong_number ne 0){
        logMessage("INFO","Racord in gp_toolkit.gp_segment_config incorrect, reinsert");
        queryResult("SRC","truncate table gp_toolkit.gp_segment_config;");
        queryResult("SRC",$RANDOM_TABLE_RECORD_INSERT_SQL);
        @check_record_result = queryResult("SRC",$random_table_record_check_sql);
        ($segment_number,$all_number,$no_entry_number,$wrong_number) = @{$check_record_result[0]};
        if($segment_number ne $all_number || $no_entry_number ne 0 || $wrong_number ne 0){
            errorMessage("Can not create right record in table table gp_toolkit.gp_segment_config");
        }
    }else{
        logMessage("INFO","Racord in gp_toolkit.gp_segment_config is OK");
    }
}
sub checkSrcCluster{
    logMessage("INFO","Check SRC plpythonu language");
    my $str_lang = queryResult("SRC","select 1 from pg_language where lanname='plpythonu';","Scalar");
    if("" eq $str_lang){
        logMessage("INFO","SRC language plpythonu not exists, create it");
        queryResult("SRC","Create language plpythonu;");
    }else{
        logMessage("INFO","SRC language plpythonu is OK");
    }
    logMessage("INFO","Check SRC function gp_toolkit.gp_transfer_execute");
    my $str_exec = queryResult("SRC",$SQL_EXECUTE_FUNCTION_ARG_CHECK,"Scalar");
    my $md5_exec = queryResult("SRC",$SQL_EXECUTE_FUNCTION_BODY_CHECK,"Scalar");
    if($STR_EXECUTE_FUNCTION_ARG_CHECK ne $str_exec || $MD5_EXECUTE_FUNCTION_BODY_CHECK ne $md5_exec){
        logMessage("INFO","SRC function gp_toolkit.gp_transfer_execute not exists or need upgrade");
        queryResult("SRC","drop function if exists gp_toolkit.gp_transfer_execute($str_exec);");
        queryResult("SRC",$SQL_EXECUTE_FUNCTION_DDL);
    }else{
        logMessage("INFO","SRC function gp_toolkit.gp_transfer_execute is OK");
    }
    if($COMPRESS ne "0"){
        $SRC_GPHOME = queryResult("SRC",q{select gp_toolkit.gp_transfer_execute('echo $GPHOME')},"Scalar");
    }else{
        $SRC_GPHOME = " ";
    }
    $SRC_GPHOME = encode($SRC_GPHOME);
}
sub checkSrcSegHba{
    if($IGNORE_HBA){
        logMessage("INFO","Ignore check SRC segment pg_hba.conf");
        return;
    }
    logMessage("INFO","Check SRC segment pg_hba.conf");
    my $str_base64 = encode_base64($STR_SEG_PG_HBA);
    my $str_md5 = trim(readpipe(qq{echo "$str_base64"|base64 -di|md5sum|cut -c 1-32}));
    my $checkSql = qq{select distinct gp_toolkit.gp_transfer_execute('md5sum pg_hba.conf|cut -c 1-32') from gp_dist_random('gp_id');};
    my @result = queryResult("SRC",$checkSql);
    if(@result > 1 || $str_md5 ne $result[0][0]){
        logMessage("INFO","SRC segment pg_hba.conf need open and reload");
        my ($code,$value) = queryResult("SRC",qq{select gp_toolkit.gp_transfer_execute('echo "$str_base64"|base64 -di > pg_hba.conf') from gp_dist_random('gp_id');});
        my ($code,$value) = queryResult("SRC",qq{select gp_toolkit.gp_transfer_execute('gpstop -u');});
    }
}
sub checkSrcHeapStatFunction{
    if(!$INCREMENT){
        return;
    }
    my $args_value = queryResult("SRC",$HEAP_STAT_FUNCTION_ARG_CHECK_SQL,"Scalar");
    my $src_md5 = queryResult("SRC",$HEAP_STAT_FUNCTION_SRC_CHECK_SQL,"Scalar");
    if($HEAP_STAT_FUNCTION_ARG_CHECK_VALUE ne $args_value || $HEAP_STAT_FUNCTION_SRC_CHECK_MD5 ne $src_md5){
        logMessage("NOTICE","No heap stat function or need replace, create or replace it");
        queryResult("SRC","DROP FUNCTION IF EXISTS gp_toolkit.gp_heap_table_stat($args_value);");
        queryResult("SRC",$HEAP_STAT_FUNCTION_DDL);
    }else{
        logMessage("INFO","Heap stat function gp_toolkit.gp_heap_table_stat is OK");
    }
}
sub checkDestCluster{
    logMessage("INFO","Check destination plpythonu language");
    my $str_lang = queryResult("DES","select 1 from pg_language where lanname='plpythonu';","Scalar");
    if("" eq $str_lang){
        logMessage("INFO","Destination language plpythonu not exists, create it");
        queryResult("DES","Create language plpythonu;");
    }else{
        logMessage("INFO","Destination language plpythonu is OK");
    }
    logMessage("INFO","Check destination function gp_toolkit.gp_transfer_execute");
    my $str_exec = queryResult("DES",$SQL_EXECUTE_FUNCTION_ARG_CHECK,"Scalar");
    my $md5_exec = queryResult("DES",$SQL_EXECUTE_FUNCTION_BODY_CHECK,"Scalar");
    if($STR_EXECUTE_FUNCTION_ARG_CHECK ne $str_exec || $MD5_EXECUTE_FUNCTION_BODY_CHECK ne $md5_exec){
        logMessage("INFO","Destination function gp_toolkit.gp_transfer_execute not exists or need upgrade");
        queryResult("DES","drop function if exists gp_toolkit.gp_transfer_execute($str_exec);");
        queryResult("DES",$SQL_EXECUTE_FUNCTION_DDL);
    }else{
        logMessage("INFO","Destination function gp_toolkit.gp_transfer_execute is OK");
    }
    my $mkdir_master_sql = qq{select gp_toolkit.gp_transfer_execute('mkdir -m 777 -p /tmp/fifo');};
    queryResult("DES",$mkdir_master_sql,"Scalar");
    my $mkdir_segment_sql = qq{select gp_toolkit.gp_transfer_execute('mkdir -m 777 -p /tmp/fifo') from gp_dist_random('gp_id');};
    queryResult("DES",$mkdir_segment_sql,"Scalar");
}
sub generateSrcHostMap{
    if("" eq $SRC_MAPFILE){
        errorMessage("Must specify parameter --src-mapfile");
    }
    if(!-e $SRC_MAPFILE){
        errorMessage("File not exists named:$SRC_MAPFILE");
    }
    for my $line(readLineFromFile($SRC_MAPFILE)){
        my ($hostname,$ip_address) = split(/,/,$line);
        $SRC_HOSTMAP{trim($hostname)} = trim($ip_address);
    }
}
sub generateSrcClusterInfo{
    logMessage("INFO","Generate source database hostname map relation-ship");
    $SRC_SEG_SIZE = queryResult("SRC","select count(*) from gp_segment_configuration where status='u' and role='p' and content>-1;","Scalar");
    $DES_SEG_SIZE = queryResult("DES","select count(*) from gp_segment_configuration where status='u' and role='p' and content>-1;","Scalar");
    my @src_host_array = queryResult("SRC",qq{select content,address,port from gp_segment_configuration where status='u' and role='p' order by content;});
    logMessage("INFO","SRC cluster size is: [$SRC_SEG_SIZE], destination cluster size is: [$DES_SEG_SIZE]");
    my @srcClusterInfoArray = ();
    my %hostname_hash = ();
    for my $row(@src_host_array){
        my ($content,$hostname,$port) = @$row;
        my $ip_address = $hostname;
        if(exists $SRC_HOSTMAP{$hostname}){
            $ip_address = $SRC_HOSTMAP{$hostname};
        }elsif(!exists $hostname_hash{$hostname}){
            logMessage("NOTICE","Hostname [$hostname] not specify a ip address with --src-mapfile");
            $hostname_hash{$hostname} = "";
        }
        push @srcClusterInfoArray,$content.','.$ip_address.','.$port;
    }
    my $srcClusterInfo = join("\n",@srcClusterInfoArray)."\n";
    my $str_base64 = encode_base64($srcClusterInfo);
    my ($code,$value) = queryResult("DES",qq{select gp_toolkit.gp_transfer_execute('echo "$str_base64"|base64 -di > /tmp/fifo/$KEY.info') from gp_dist_random('gp_id');});
    my ($code,$value) = queryResult("DES",qq{select gp_toolkit.gp_transfer_execute('echo "$str_base64"|base64 -di > /tmp/fifo/$KEY.info');});
}
sub checkCleverCommand{
    logMessage("INFO","Check destination magic command file");
    my $checkSql = qq{select distinct gp_toolkit.gp_transfer_execute('if [ -f /tmp/fifo/gpdbtransfercat.sh ];then md5sum /tmp/fifo/gpdbtransfercat.sh|cut -c 1-32;fi') from gp_dist_random('gp_id')
union
select gp_toolkit.gp_transfer_execute('if [ -f /tmp/fifo/gpdbtransfercat.sh ];then md5sum /tmp/fifo/gpdbtransfercat.sh|cut -c 1-32;fi');};
    my @result = queryResult("DES",$checkSql);
    my $clever_command = $CMD_CLEVER_COMMAND;
    my $str_base64 = encode_base64($clever_command);
    my $str_md5 = trim(readpipe(qq{echo "$str_base64"|base64 -di|md5sum|cut -c 1-32}));
    if(@result > 1 || $str_md5 ne $result[0][0]){
        logMessage("INFO","Destination magic command file [/tmp/fifo/gpdbtransfercat.sh] not exists or need upgrade");
        my ($code,$value) = queryResult("DES",qq{select gp_toolkit.gp_transfer_execute('echo "$str_base64"|base64 -di > /tmp/fifo/gpdbtransfercat.sh');});
        my ($code,$value) = queryResult("DES",qq{select gp_toolkit.gp_transfer_execute('echo "$str_base64"|base64 -di > /tmp/fifo/gpdbtransfercat.sh') from gp_dist_random('gp_id');});
    }
}
sub getTableFromFile{
    my ($file_path) = @_;
    my @line_array;
    if("" eq $file_path){
        return @line_array;
    }elsif(!-e $file_path){
        errorMessage("File not exists named:$file_path");
    }
    for my $line(readLineFromFile($file_path)){
        $line = trim($line);
        if(!($line =~ /^#/) && "" ne $line){
            my ($table_name,$where) = split(/;/,$line,2);
            if($INCREMENT){
                $where = "";
            }
            my ($src_table,$des_table) = split(/=>/,$table_name);
            if($INCREMENT){
                ($des_table,$where) = ($src_table,"");
            }
            push @line_array,[(trim($src_table),trim($des_table),trim($where))];
        }
    }
    return @line_array;
}
sub getReplicationTable{
    if($SRC_DATABASE_VERSION > 5){
        my @result = queryResult("SRC",$GET_REPLICATION_TABLE_SQL);
        for my $row(@result){
            my ($scma,$rel) = @$row;
            ($scma,$rel) = (decode($scma),decode($rel));
            $HASH_REPLICATION_TABLE{$scma.".".$rel} = "";
        }
    }
}
sub getAllUserTable{
    if($BY_LEAF){
        return queryResult("SRC",$GET_PARENT_CHILD_LIST_SQL);
    }
    return queryResult("SRC",$GET_ALL_USER_TABLE_SQL);
}
sub getAllUserView{
    if($INCREMENT){
        return ();
    }
    return queryResult("SRC",$SQL_GET_ALL_USER_VIEW);
}
sub processTableArray{
    my @all_parent_child_list = getAllUserTable();
    my $all_user_table_size = @all_parent_child_list;
    logMessage("INFO","All table number in database is: [$all_user_table_size]");
    my @all_table_temp_list = ();
    for my $row(@all_parent_child_list){
        my ($p_ao,$p_scma,$p_rel,$p_typ,$p_tim,$c_ao,$c_scma,$c_rel,$c_typ,$c_tim) = @$row;
        ($p_scma,$p_rel,$c_scma,$c_rel) = (decode($p_scma),decode($p_rel),decode($c_scma),decode($c_rel));
        push @all_table_temp_list,[($p_ao,$p_scma,$p_rel,$p_typ,$p_tim,$c_ao,$c_scma,$c_rel,$c_typ,$c_tim)];
    }
    @all_parent_child_list = @all_table_temp_list;
    #####################################################################################################
    my %specify_table_hash = ();
    for my $table(@TABLE_ARRAY){
        my ($scma,$rel) = split(/\./,$table,2);
        if(illegal($scma,$rel)){
            logMessage("WARN","Table name specify is not legal:[$table]");
            next;
        }
        $specify_table_hash{$table} = ([$scma,$rel]);
    }
    for my $row(getTableFromFile($TABLE_FILE)){
        my ($src_table,$des_table,$where) = @$row;
        if("" eq $des_table){
            $des_table = $src_table;
        }
        my ($src_scma,$src_rel) = split(/\./,$src_table,2);
        my ($des_scma,$des_rel) = split(/\./,$des_table,2);
        if(illegal($src_scma,$src_rel,$des_scma,$des_rel)){
            logMessage("WARN","Table name in file is not legal:[$src_table => $des_table]");
            next;
        }
        $specify_table_hash{$src_table} = ([$des_scma,$des_rel,$where]);
    }
    #####################################################################################################
    my %ex_table_hash = ();
    for my $table(@EX_TABLE_ARRAY){
        my ($scma,$rel) = split(/\./,$table,2);
        if(illegal($scma,$rel)){
            logMessage("WARN","Ex table name specify is not legal:[$table]");
            next;
        }
        $ex_table_hash{$table} = "";
    }
    for my $row(getTableFromFile($EX_TABLE_FILE)){
        my ($table) = @$row;
        my ($scma,$rel) = split(/\./,$table,2);
        if(illegal($scma,$rel)){
            logMessage("WARN","Table name in file is not legal:[$table]");
            next;
        }
        $ex_table_hash{$table} = ""
    }
    #####################################################################################################
    my %specify_schema_hash = ();
    for my $schema(@SCHEMA_ARRAY){
        $specify_schema_hash{$schema} = "";
    }
    my %ex_schema_hash = ();
    for my $schema(@EX_SCHEMA_ARRAY){
        $ex_schema_hash{$schema} = "";
    }
    #####################################################################################################
    my $check_specify = 0;
    if(keys %specify_table_hash > 0 || keys %specify_schema_hash > 0 || "" ne $TABLE_FILE){
        $check_specify = 1;
        logMessage("INFO","Transfer relation from parameter or file in database: $SRC_DATABASE");
    }else{
        logMessage("INFO","Transfer all user table in database: $SRC_DATABASE");
    }
    #####################################################################################################
    @TABLE_ARRAY = ();
    for my $row(@all_parent_child_list){
        my ($p_ao,$p_scma,$p_rel,$p_typ,$p_tim,$c_ao,$c_scma,$c_rel,$c_typ,$c_tim) = @$row;
        my $p_table = $p_scma.".".$p_rel;
        my $c_table = $c_scma.".".$c_rel;
        if(illegal($p_scma,$p_rel,$c_scma,$c_rel)){
            logMessage("FAILED","Table name is not legal:[".escape($c_table eq "." ? $p_table : $c_table)."]");
            next;
        }
        if(exists $ex_table_hash{$p_table} || exists $ex_table_hash{$c_table} || exists $ex_schema_hash{$p_scma}){
            next;
        }
        if($check_specify && (not exists $specify_table_hash{$p_table}) && (not exists $specify_table_hash{$c_table}) && (not exists $specify_schema_hash{$p_scma})){
            next;
        }
        my ($src_ao,$src_scma,$src_rel,$src_typ,$src_tim,$where);
        if($BY_LEAF && "" ne $c_scma){
            ($src_ao,$src_scma,$src_rel,$src_typ,$src_tim,$where) = ($c_ao,$c_scma,$c_rel,$c_typ,$c_tim,"");
        }else{
            ($src_ao,$src_scma,$src_rel,$src_typ,$src_tim,$where) = ($p_ao,$p_scma,$p_rel,$p_typ,$p_tim,"");
        }
        my ($des_scma,$des_rel) = ($src_scma,$src_rel);
        my $src_table = $src_scma.".".$src_rel;
        my $src_parent = $p_scma.".".$p_rel;
        if(exists $specify_table_hash{$src_table}){
            ($des_scma,$des_rel,$where) = @{$specify_table_hash{$src_table}};
        }elsif(exists $specify_table_hash{$src_parent}){
            $where = @{$specify_table_hash{$src_parent}}[2];
        }
        my $policytype = "p";
        if(exists $HASH_REPLICATION_TABLE{$src_scma.".".$src_rel}){
            $policytype = "r";
        }
        push @TABLE_ARRAY,[($p_scma,$p_rel,$src_ao,$src_scma,$src_rel,$policytype,$src_typ,$src_tim,$where,"r",qq{"$src_scma"."$src_rel"},$des_scma,$des_rel)];
    }
    if($check_specify && !$INCREMENT){
        my @all_user_view_list = getAllUserView();
        my $all_user_view_size = @all_user_view_list;
        logMessage("INFO","All view number in database is: [$all_user_view_size]");
        my @all_view_temp_list = ();
        for my $row(@all_user_view_list){
            my($scma,$rel,$dep_tables) = @$row;
            push @all_view_temp_list,[(decode($scma),decode($rel),decode($dep_tables))];
        }
        @all_user_view_list = @all_view_temp_list;
        for my $row(@all_user_view_list){
            my($src_scma,$src_rel,$dep_tables) = @$row;
            my $src_table = $src_scma.".".$src_rel;
            if(illegal($src_scma,$src_rel)){
                logMessage("FAILED","Table name is not legal:[".escape($src_table)."]");
                next;
            }
            if(exists $ex_table_hash{$src_table} || exists $ex_schema_hash{$src_scma}){
                next;
            }
            if(not exists $specify_table_hash{$src_table}){
                next;
            }
            my ($des_scma,$des_rel,$where) = @{$specify_table_hash{$src_table}};
            push @TABLE_ARRAY,[($src_scma,$src_rel,"",$src_scma,$src_rel,"p","","",$where,"v",$dep_tables,$des_scma,$des_rel)];
        }
    }
}
sub tryCheckpoint{
    my ($code,$value) = queryResult("SRC",qq{CHECKPOINT;},"CV");
    if($code eq 0){
        logMessage("INFO","Try to execute checkpoint success");
    }else{
        errorMessage("Try to execute checkpoint failed:\n".$value);
    }
}
sub getLastStatFromFile{
    if(!$INCREMENT){
        return ();
    }
    my %last_stat_map = ();
    for my $row(readLineFromFile($LAST_STAT_FILE_NAME)){
        my ($base64_name,$full_name,$time,$count) = split(/;/,$row);
        my ($dbname,$table) = split(/\./,$full_name,2);
        $last_stat_map{$table} = [($time,$count)];
    }
    return %last_stat_map;
}
sub getAOLastStatFromCatalog{
    if(!$INCREMENT){
        return ();
    }
    my %last_stat_map = ();
    my @query_array = ();
    my ($ao_index, $all_index) = (0, 0);
    for my $row(@TABLE_ARRAY){
        my ($p_scma,$p_rel,$ao,$scma,$rel,$ptyp,$typ,$tim) = @$row;
        $all_index += 1;
        if($typ eq "a"){
            if("" eq $ao){
                logMessage("WARN","AO table $scma.$rel missing aoseg table, it may be transfer always and may occur error");
            }else{
                push @query_array,"select '$scma','$rel',coalesce(sum(modcount),0) from pg_aoseg.$ao";
            }
            $ao_index += 1;
        }
        if($ao_index == $SQL_BATCH || $all_index == @TABLE_ARRAY){
            for my $row(queryResult("SRC",join("\nUNION ALL\n",@query_array))){
                my ($c_scma,$c_rel,$count) = @$row;
                my $table = $c_scma.".".$c_rel;
                $last_stat_map{$table} = $count;
            }
            @query_array = ();
            $ao_index = 0;
        }
    }
    return %last_stat_map;
}
sub getHeapLastStatFromCatalog{
    if(!$INCREMENT){
        return ();
    }
    logMessage("INFO","Check heap table stat, if heap table amount is larger, it might cost more time......");
    my %last_stat_map = ();
    my @query_array = ();
    my ($heap_index, $all_index) = (0, 0);
    for my $row(@TABLE_ARRAY){
        my ($p_scma,$p_rel,$ao,$scma,$rel,$ptyp,$typ,$tim) = @$row;
        $all_index += 1;
        if($typ eq "h"){
            my $query_sql = qq{select '$scma','$rel',md5(string_agg(id||','||t, chr(10) order by id,t)) from(\n};
            $query_sql = $query_sql.qq{select gp_segment_id id,gp_toolkit.gp_heap_table_stat($SRC_DATABASE_VERSION,'$SRC_DATABASE',database_port,database_path,'$scma.$rel') t\n};
            $query_sql = $query_sql.qq{from gp_dist_random('gp_toolkit.gp_segment_config'))x};
            push @query_array,$query_sql;
            $heap_index += 1;
        }
        if($heap_index == 100 || $all_index == @TABLE_ARRAY){
            for my $row(queryResult("SRC",join("\nUNION ALL\n",@query_array))){
                my ($c_scma,$c_rel,$md5) = @$row;
                my $table = $c_scma.".".$c_rel;
                $last_stat_map{$table} = $md5;
            }
            @query_array = ();
            $heap_index = 0;
        }
    }
    return %last_stat_map;
}
sub processLastTransferStat{
    my %last_stat_from_file = getLastStatFromFile();
    my %ao_last_stat_from_cata = getAOLastStatFromCatalog();
    my %heap_last_stat_from_cata = getHeapLastStatFromCatalog();
    my %unignore_table_hash = ();
    if($INCREMENT){
        for my $row(getTableFromFile($UNIGNORE_FILE)){
            my ($table) = @$row;
            my ($scma,$rel) = split(/\./,$table,2);
            if(illegal($scma,$rel)){
                logMessage("WARN","Table name in file is not legal:[$table]");
                next;
            }
            $unignore_table_hash{$table} = ""
        }
    }
    my @temp_table_array = ();
    my $ignore_count = 0;
    for my $row(@TABLE_ARRAY){
        my ($p_scma,$p_rel,$src_ao,$src_scma,$src_rel,$policytype,$src_typ,$src_tim,$where,$relkind,$dep_tables,$des_scma,$des_rel) = @$row;
        my $src_table = $src_scma.".".$src_rel;
        my $des_table = $des_scma.".".$des_rel;
        if(!$INCREMENT){
            push @temp_table_array,[($p_scma,$p_rel,$src_table,$policytype,$des_table,$where,$relkind,$dep_tables)];
            next;
        }
        if("h" eq $src_typ){
            my $md5 = $heap_last_stat_from_cata{$src_table};
            my ($f_md5,$f_count);
            if(exists $last_stat_from_file{$src_table}){
                ($f_md5,$f_count) = @{$last_stat_from_file{$src_table}};
            }
            if("" eq $f_md5 || $f_md5 ne $md5 || exists $unignore_table_hash{$src_table}){
                push @temp_table_array,[($p_scma,$p_rel,$src_table,$policytype,$des_table,$where,$relkind,$dep_tables,$md5,0)];
            }else{
                $ignore_count ++;
            }
        }elsif("a" eq $src_typ){
            my $count = $ao_last_stat_from_cata{$src_table};
            my ($f_tim,$f_count);
            if(exists $last_stat_from_file{$src_table}){
                ($f_tim,$f_count) = @{$last_stat_from_file{$src_table}};
            }
            if("" eq $f_tim || $f_tim ne $src_tim || $count ne $f_count || exists $unignore_table_hash{$src_table}){
                push @temp_table_array,[($p_scma,$p_rel,$src_table,$policytype,$des_table,$where,$relkind,$dep_tables,$src_tim,$count)];
            }else{
                $ignore_count ++;
            }
        }
    }
    if($INCREMENT){
        logMessage("INFO","Ignore table number for increment stat not change is: [$ignore_count]");
    }
    @TABLE_ARRAY = @temp_table_array;
}
sub checkConflictProcess{
    system(qq{mkdir -m 777 -p /tmp/fifo});
    my $lock_file = "/tmp/fifo/.gpdbtransfer.$KEY.lock";
    if(!open($LOCK_FILE_HANDLE,">",$lock_file)){
        errorMessage("Can't open lock file: $lock_file");
    }
    my $lockCode = flock($LOCK_FILE_HANDLE, LOCK_EX | LOCK_NB);
    if(!$lockCode){
        errorMessage("Lock file is in using, you can try again later");
    }
}
sub checkTransferSchema{
    my $des_exists = queryResult("DES","SELECT count(*) from pg_namespace where nspname='gpdbtransfer';","Scalar");
    if(0 == $des_exists){
        logMessage("NOTICE","No schema named gpdbtransfer in destination database, create it.");
        queryResult("DES","create schema gpdbtransfer;","Scalar");
    }
}
sub checkContentConsistent{
    if($MASTER_ONLY){
        return;
    }
    my $checkTable = 'gpdbtransfer.transform_'.$KEY.'_0';
    queryResult("DES","drop external table if exists $checkTable;","CV");
    my $base64_pass = encode("" eq $SRC_PASSWD ? "--" : $SRC_PASSWD);
    my $base64_lock = encode("--");
    my $base64_where = encode("TEST");
    my $checkSql = qq{drop external table if exists $checkTable;\n};
    $checkSql = $checkSql."create external web table $checkTable(\n";
    $checkSql = $checkSql."cntt int\n";
    $checkSql = $checkSql.")execute E'sh /tmp/fifo/gpdbtransfercat.sh $SRC_GPHOME.$KEY.$SRC_USER.$base64_pass.0.$DES_SEG_SIZE";
    $checkSql = $checkSql.".S.\$GP_SEG_PORT.$SRC_DATABASE.M.C.$base64_lock.$ENCODING.0.$COMPRESS.$base64_where'";
    $checkSql = $checkSql."\nON ALL FORMAT 'TEXT' ENCODING '$ENCODING';";
    $checkSql = $checkSql."\nselect count(*) from $checkTable;";
    my ($code,$value) = queryResult("DES",$checkSql,"CV");
    queryResult("DES","drop external table if exists $checkTable;","CV");
    if($code > 0){
        errorMessage("Some source hostname may not match address or ssh without password to src cluster occur error:\n".$value);
    }
}
sub checkTableDefinition{
    my ($src_column,$src_table,$des_schema,$des_relation) = @_;
    my $des_kind_sql = qq{select c.relkind from pg_namespace n,pg_class c  where c.relnamespace=n.oid and n.nspname='$des_schema' and c.relname='$des_relation';};
    my ($code,$relkind) = queryResult("DES",$des_kind_sql,"CV");
    if($code != 0){
        return "Check destination relation occur error:".$relkind;
    }elsif("" ne $relkind && "r" ne $relkind){
        return "Destination relation $des_schema.$des_relation is not a table";
    }
    my $recreate_sql = "";
    my $diff_column = "";
    my @src_column_array = @$src_column;
    my $src_column_size = @src_column_array;
    if("r" eq $relkind){
        my $des_query_sql = qq{select attname,format_type(atttypid,atttypmod) from pg_namespace n,pg_class c,pg_attribute a where c.relnamespace=n.oid and c.oid=a.attrelid};
        $des_query_sql = $des_query_sql.qq{ and c.relkind='r' and n.nspname='$des_schema' and c.relname='$des_relation' and a.attnum>0 and not a.attisdropped order by a.attnum;};
        my ($code,@des_column_array) = queryResult("DES",$des_query_sql,"CR");
        if($code != 0){
            return "Get destination relation columns occur error:\n".join("\n",@des_column_array);
        }
        my $des_column_size = @des_column_array;
        if($src_column_size eq $des_column_size){
            for my $idx(0 .. $src_column_size - 1){
                my $src_column_type = $src_column_array[$idx]->[1];
                my $des_column_type = $des_column_array[$idx]->[1];
                if($src_column_type ne $des_column_type){
                    $diff_column = "column type ".($idx + 1)." SRC $src_column_type DES $des_column_type";
                    last;
                }
            }
        }else{
            $diff_column = "column number SRC $src_column_size DES $des_column_size";
        }
    }
    if("r" eq $relkind && "" eq $diff_column){
        return "";
    }elsif("r" eq $relkind && $FORCE){
        $recreate_sql = $recreate_sql.qq{BEGIN;LOCK TABLE "$des_schema"."$des_relation" IN ACCESS EXCLUSIVE MODE NOWAIT;END;DROP TABLE IF EXISTS "$des_schema"."$des_relation" CASCADE;END;\n};
        logMessage("NOTICE","Destination table not match the source $diff_column, force recreate it:[$des_schema.$des_relation]");
    }elsif("" eq $relkind && $FORCE){
        logMessage("NOTICE","Destination relation not exists, force create it:[$des_schema.$des_relation]");
    }elsif("" eq $relkind && !$FORCE){
        return "Destination relation not exists:[$des_schema.$des_relation]";
    }else{
        return "FROM $src_table TO $des_schema.$des_relation has unknown problem $diff_column";
    }
    if(("" eq $relkind ||  "" ne $diff_column) && $FORCE){
        $recreate_sql = $recreate_sql.qq{CREATE TABLE "$des_schema"."$des_relation"(\n};
        my $comment_sql = "\n";
        for my $idx(0 .. $src_column_size - 1){
            my ($column_name,$column_type,$relation_dsc,$column_dsc) = @{$src_column_array[$idx]};
            ($relation_dsc,$column_dsc) = (decode($relation_dsc),decode($column_dsc));
            $relation_dsc =~ s/'/''/g;
            $column_dsc =~ s/'/''/g;
            $recreate_sql = $recreate_sql.quote($column_name)."\t".$column_type;
            $recreate_sql = $recreate_sql.($idx < $src_column_size-1 ? ",\n" : "\n");
            if($idx == 0 && "" ne $relation_dsc){$comment_sql = $comment_sql.qq{COMMENT ON TABLE "$des_schema"."$des_relation" IS '$relation_dsc';\n};}
            if("" ne $column_dsc){$comment_sql = $comment_sql.qq{COMMENT ON COLUMN "$des_schema"."$des_relation"."$column_name" IS '$column_dsc';\n};}
        }
        if($DES_DATABASE_VERSION > 5){
            $recreate_sql = $recreate_sql.")WITH(APPENDONLY=TRUE,COMPRESSTYPE=ZSTD)DISTRIBUTED RANDOMLY;";
        }else{
            $recreate_sql = $recreate_sql.")WITH(APPENDONLY=TRUE,COMPRESSTYPE=ZLIB,COMPRESSLEVEL=5)DISTRIBUTED RANDOMLY;";
        }
        if("" ne $FORCE_OWNER){
            $recreate_sql = $recreate_sql.qq{ALTER TABLE "$des_schema"."$des_relation" OWNER TO $FORCE_OWNER;};
        }
        my ($code,$value) = queryResult("DES",$recreate_sql.$comment_sql,"CV");
        if($code){
            return "Recreate destination relation occur error:[$value]";
        }else{
            return "";
        }
    }
}
sub getSrcTableColumn{
    my ($schema_name,$relation_name) = @_;
    my $query_sql;
    if($FORCE){
        $query_sql = qq{select attname,format_type(atttypid,atttypmod),
        replace(encode(textsend(d1.description),'base64'),chr(10),'') dsc1,
        replace(encode(textsend(d2.description),'base64'),chr(10),'') dsc2
        from pg_namespace n join pg_class c on c.relnamespace=n.oid
        join pg_attribute a on c.oid=a.attrelid
        left join pg_description d1 on c.oid=d1.objoid and d1.objsubid=0
        left join pg_description d2 on c.oid=d2.objoid and a.attnum=d2.objsubid
        where  n.nspname='$schema_name' and c.relname='$relation_name'
        and a.attnum>0 and not a.attisdropped order by a.attnum;};
    }else{
        $query_sql = qq{select attname,format_type(atttypid,atttypmod) from
        pg_namespace n join pg_class c on c.relnamespace=n.oid
        join pg_attribute a on c.oid=a.attrelid
        where  n.nspname='$schema_name' and c.relname='$relation_name'
        and a.attnum>0 and not a.attisdropped order by a.attnum;};
    }
    my ($code,@column_array) = queryResult("SRC",$query_sql,"CR");
    return ($code,\@column_array);
}
sub executeTransfer{
    my ($thread_index) = @_;
    my $des_ext_table = 'gpdbtransfer.transform_'.$KEY."_".$thread_index;
    my (@task_array,$code,$value);
    my $task_index = 0;
    my $base64_pass = encode("" eq $SRC_PASSWD ? "--" : $SRC_PASSWD);
    while(1){
        queryResult("DES","DROP EXTERNAL TABLE IF EXISTS $des_ext_table;","CV");
        if($task_index >= @task_array){
            my $task_list = $TASK_QUEUE->dequeue();
            if(undef eq $task_list){
                last;
            }
            @task_array = @$task_list;
            $task_index = 0;
        }
        my @stat_msg :shared;
        my ($src_table,$policytype,$des_table,$where,$relkind,$dep_tables,$src_tim,$count) = @{$task_array[$task_index]};
        my $from_to_info = qq{FROM $src_table TO $des_table};
        if($src_table eq $des_table){
            $from_to_info = qq{FOR TABLE NAME $des_table};
        }
        $where = "" eq $where ? $GLOBAL_CONDITION : " where ".$where;
        my ($src_scma,$src_rel) = split(/\./,$src_table,2);
        my $src_tableqq = qq{"$src_scma"."$src_rel"};
        my ($des_schema,$des_relation) = split(/\./,$des_table,2);
        my $des_tableqq = qq{"$des_schema"."$des_relation"};
        my $problem = "";
        if(!$IGNORE_CHECK){
            my ($src_code,$src_column) = getSrcTableColumn($src_scma,$src_rel);
            if($src_code != 0){
                $problem = "Get source relation columns occur error:\n".join("\n",@$src_column);
            }else{
                $problem = checkTableDefinition($src_column,$src_table,$des_schema,$des_relation);
            }
        }
        if("" ne $problem){
            @stat_msg = ("FAILED",$problem);
        }else{
            my $from_master = 0;
            if($MASTER_ONLY){
                $from_master = 1;
            }elsif("v" eq $relkind){
                ($code,$value) = queryResult("SRC",qq{explain select * from $src_tableqq}.qq{$where;},"CV");
                if($code > 0){
                   @stat_msg = ("FAILED","Can't get view's explain, $from_to_info [$code],[$value]");
                   goto GOTO_NEXT_TASK;
                }
                if($value =~ /Gather Motion 1:1 /){
                    if($SRC_SEG_SIZE > 1){
                        $from_master = 1;
                    }
                }elsif($value =~ /Gather Motion/){
                    $value =~ s/Gather Motion/Ignore Check/;
                    if($value =~ / Motion/){
                        $from_master = 1;
                    }
                }else{
                    $from_master = 1;
                }
            }elsif($policytype eq "r"){
                $from_master = 1;
            }
            my $lock_sql = "--";
            if("" ne $dep_tables){
                $lock_sql = 'LOCK TABLE '.$dep_tables.' IN ACCESS SHARE MODE NOWAIT;';
            }
            my $base64_lock = encode($lock_sql);
            my $base64_where = encode($where);
            srand;
            my $random = ($SRC_SEG_SIZE eq $DES_SEG_SIZE) ? 0 : int(rand(1*$SRC_SEG_SIZE*$DES_SEG_SIZE));
            my $des_ext_ddl = "DROP EXTERNAL TABLE IF EXISTS $des_ext_table;\n";
            $des_ext_ddl = $des_ext_ddl."CREATE EXTERNAL WEB TABLE $des_ext_table(\n";
            $des_ext_ddl = $des_ext_ddl.qq{LIKE $des_tableqq};
            $des_ext_ddl = $des_ext_ddl."\n)EXECUTE E'sh /tmp/fifo/gpdbtransfercat.sh $SRC_GPHOME.$KEY.$SRC_USER.$base64_pass.$thread_index.$DES_SEG_SIZE";
            $des_ext_ddl = $des_ext_ddl.($from_master ? ".M" : ".S");
            $des_ext_ddl = $des_ext_ddl.".\$GP_SEG_PORT.$SRC_DATABASE.$src_scma.$src_rel.$base64_lock.$ENCODING.$random.$COMPRESS.$base64_where'";
            if($from_master){
                $des_ext_ddl = $des_ext_ddl." ON MASTER";
            }else{
                $des_ext_ddl = $des_ext_ddl." ON ALL";
            }
            $des_ext_ddl = $des_ext_ddl."\nFORMAT 'TEXT' ENCODING '$ENCODING';";
            ($code,$value) = queryResult("DES",$des_ext_ddl,"CV");
            if($code > 0){
                @stat_msg = ("FAILED","DDL occur error, $from_to_info [$code],[$value]");
                goto GOTO_NEXT_TASK;
            }
            my @info_msg :shared = ("INFO","Start transfer $from_to_info TYPE $relkind MODE ".($from_master ? "SLOW" : "FAST")." WITH $where");
            $MSG_QUEUE->enqueue(\@info_msg);
            my $trans_sql = $TRANSACTION || $DES_DATABASE_VERSION > 5 ? "BEGIN;\n" : "";
            if($NEED_TRUNCATE || ($NEED_DELETE && "" eq $where)){
                $trans_sql = $trans_sql.qq{LOCK TABLE $des_tableqq IN ACCESS EXCLUSIVE MODE NOWAIT;\nTRUNCATE TABLE $des_tableqq;\n};
            }elsif($NEED_DELETE){
                $trans_sql = $trans_sql.qq{LOCK TABLE $des_tableqq IN EXCLUSIVE MODE NOWAIT;\nDELETE FROM $des_tableqq}.$where.";\n";
            }else{
                $trans_sql = $trans_sql.qq{LOCK TABLE $des_tableqq IN ROW EXCLUSIVE MODE NOWAIT;\n};
            }
            $trans_sql = $trans_sql.qq{INSERT INTO $des_tableqq SELECT * FROM $des_ext_table;}.($TRANSACTION || $DES_DATABASE_VERSION > 5 ? "\nEND;" : "");
            my $start = time();
            ($code,$value) = queryResult("DES",$trans_sql,"CV");
            my $duration = time() - $start;
            if($code > 0){
                my $app_name = qq{GPDBTRANSFER.$KEY.$thread_index};
                my $clean_sql;
                if($from_master){
                    $clean_sql = qq{SELECT pg_cancel_backend(procpid) FROM pg_stat_activity WHERE application_name = '$app_name';};
                }else{
                    $clean_sql = qq{SELECT pg_cancel_backend(procpid) FROM (SELECT (pg_stat_get_activity(NULL::integer)).* FROM gp_dist_random('gp_id')) t WHERE application_name = '$app_name';};
                }
                @stat_msg = ("FAILED","$from_to_info [$code],[$value]");
                ($code,$value) = queryResult("SRC",$clean_sql,"CV");
            }else{
                for my $row(split(/\n/,$value)){if($row =~ /^INSERT/){$value = (split(/ /,$row))[2];}}
                @stat_msg = ("SUCCESS","$from_to_info ROWS $value TYPE $relkind TIME $duration S WITH".($where eq "" ? "" :  " $where"),$src_scma,$src_rel,$src_tim.";".$count);
            }
        }
GOTO_NEXT_TASK:
        $MSG_QUEUE->enqueue(\@stat_msg);
        $task_index += 1;
    }
    $MSG_QUEUE->enqueue(undef);
}
sub executeMessage{
    my ($end_index,$error_index,$success_index) = (0,0,0);
    my $tables_size = @TABLE_ARRAY;
    my $msg = $MSG_QUEUE->dequeue();
    while(1){
        if(defined $msg){
            my ($type,$msg,$scma,$rel,$last_stat) = @$msg;
            my $full_name = $SRC_DATABASE.'.'.$scma.'.'.$rel;
            if("SUCCESS" eq $type){
                $success_index += 1;
                my $base64_name = trim(encode($full_name));
                if($INCREMENT && "" ne $last_stat){
                    my $stat_record = $base64_name.";".$full_name.";".$last_stat;
                    system(qq{sed '/^$base64_name;/d' -i $LAST_STAT_FILE_NAME\necho '$stat_record' >> $LAST_STAT_FILE_NAME\n});
                }
                logMessage($type,"($success_index/$error_index/$tables_size) ".$msg);
            }elsif("FAILED" eq $type){
                $error_index += 1;
                logMessage($type,"($success_index/$error_index/$tables_size) ".$msg);
            }else{
                logMessage($type,$msg);
            }
        }else{
            $end_index += 1;
            if($end_index eq $BATCH_SIZE){
                last;
            }
        }
        $msg = $MSG_QUEUE->dequeue();
    }
    return $error_index;
}
sub startTransfer{
    my $tables_size = @TABLE_ARRAY;
    if(0 == $tables_size){
        logMessage("INFO","No table will be transfer, exit");
        if($INCREMENT){
            return 0;
        }else{
            return -1;
        }
    }else{
        logMessage("INFO","Number of tables should be transfer is: $tables_size");
    }
    $TASK_QUEUE = Thread::Queue->new();
    $MSG_QUEUE = Thread::Queue->new();
    my @init_array :shared = ();
    my $array_refer = \@init_array;
    my ($pre_p_scma,$pre_p_rel) = ("","");
    for my $index(0 .. $tables_size){
        my ($p_scma,$p_rel,$src_table,$policytype,$des_table,$where,$relkind,$dep_tables,$src_tim,$count) = $index < $tables_size ? @{$TABLE_ARRAY[$index]} : ();
        my @task :shared = ($src_table,$policytype,$des_table,$where,$relkind,$dep_tables,$src_tim,$count);
        if(($p_scma eq $pre_p_scma && $p_rel eq $pre_p_rel && $TRANSACTION && $DES_DATABASE_VERSION < 6) || $index == 0){
            push @$array_refer,\@task;
        }elsif($index > 0){
            $TASK_QUEUE->enqueue($array_refer);
            my @new_array :shared = ();
            $array_refer = \@new_array;
            push @$array_refer,\@task;
        }
        ($pre_p_scma,$pre_p_rel) = ($p_scma,$p_rel);
    }
    for my $index(0 .. $BATCH_SIZE - 1){
        $TASK_QUEUE->enqueue(undef);
        my $task_thread = threads->new(\&executeTransfer,$index);
        push @TASK_THREAD,$task_thread;
    }
    $MSG_THREAD = threads->new(\&executeMessage);
    for my $thread(@TASK_THREAD){
        $thread->join();
    }
    @TASK_THREAD = ();
    my $value = $MSG_THREAD->join();
    $MSG_THREAD = "";
    return $value;
}
sub main{
    getOption();
    logMessage("INFO","Start gpdbtransfer process".("." x 66));
    logMessage("INFO","Run command: ".$_[0]);
    checkOption();
    $WRITE_TO_FILE = 1;
    getKeyWord();
    getVersion();
    checkSrcRandomTable();
    checkSrcCluster();
    checkSrcSegHba();
    checkSrcHeapStatFunction();
    checkDestCluster();
    generateSrcHostMap();
    generateSrcClusterInfo();
    checkCleverCommand();
    getReplicationTable();
    processTableArray();
    tryCheckpoint();
    processLastTransferStat();
    checkConflictProcess();
    checkTransferSchema();
    checkContentConsistent();
    my $error_count = startTransfer();
    if($error_count == -1){
        logMessage("INFO","Finish transfer with no table be transfer......");
        exitMain(11);
    }elsif($error_count != 0){
        logMessage("INFO","Finish transfer with failed......");
        exitMain(33);
    }else{
        logMessage("INFO","Finish transfer with all success......");
        exitMain(0);
    }
}
my $command_string = $0." ".join(" ",@ARGV);
STDOUT->autoflush(1);
STDERR->autoflush(1);
main($command_string);
